# Copyright 2019 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Runs all benchmarks in PerfKitBenchmarker.

All benchmarks in PerfKitBenchmarker export the following interface:

GetConfig: this returns, the name of the benchmark, the number of machines
         required to run one instance of the benchmark, a detailed description
         of the benchmark, and if the benchmark requires a scratch disk.
Prepare: this function takes a list of VMs as an input parameter. The benchmark
         will then get all binaries required to run the benchmark and, if
         required, create data files.
Run: this function takes a list of VMs as an input parameter. The benchmark will
     then run the benchmark upon the machines specified. The function will
     return a dictonary containing the results of the benchmark.
Cleanup: this function takes a list of VMs as an input parameter. The benchmark
         will then return the machine to the state it was at before Prepare
         was called.

PerfKitBenchmarker has the following run stages: provision, prepare,
    run, cleanup, teardown, and all.

provision: Read command-line flags, decide what benchmarks to run, and
    create the necessary resources for each benchmark, including
    networks, VMs, disks, and keys, and generate a run_uri, which can
    be used to resume execution at later stages.
prepare: Execute the Prepare function of each benchmark to install
         necessary software, upload datafiles, etc.
run: Execute the Run function of each benchmark and collect the
     generated samples. The publisher may publish these samples
     according to PKB's settings. The Run stage can be called multiple
     times with the run_uri generated by the provision stage.
cleanup: Execute the Cleanup function of each benchmark to uninstall
         software and delete data files.
teardown: Delete VMs, key files, networks, and disks created in the
    'provision' stage.

all: PerfKitBenchmarker will run all of the above stages (provision,
     prepare, run, cleanup, teardown). Any resources generated in the
     provision stage will be automatically deleted in the teardown
     stage, even if there is an error in an earlier stage. When PKB is
     running in this mode, the run cannot be repeated or resumed using
     the run_uri.
"""


import collections
import copy
import getpass
import itertools
import json
import logging
import multiprocessing
from os.path import isfile
import pickle
import random
import re
import sys
import threading
import time
import types
from typing import Dict, List, Optional, Sequence, Set, Tuple
import uuid

from absl import flags
from perfkitbenchmarker import archive
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import benchmark_lookup
from perfkitbenchmarker import benchmark_sets
from perfkitbenchmarker import benchmark_spec as bm_spec
from perfkitbenchmarker import benchmark_status
from perfkitbenchmarker import configs
from perfkitbenchmarker import context
from perfkitbenchmarker import disk
from perfkitbenchmarker import errors
from perfkitbenchmarker import events
from perfkitbenchmarker import flag_util
from perfkitbenchmarker import linux_benchmarks
from perfkitbenchmarker import log_util
from perfkitbenchmarker import os_types
from perfkitbenchmarker import package_lookup
from perfkitbenchmarker import providers
from perfkitbenchmarker import requirements
from perfkitbenchmarker import sample
from perfkitbenchmarker import spark_service
from perfkitbenchmarker import stages
from perfkitbenchmarker import static_virtual_machine
from perfkitbenchmarker import timing_util
from perfkitbenchmarker import traces
from perfkitbenchmarker import version
from perfkitbenchmarker import vm_util
from perfkitbenchmarker import windows_benchmarks
from perfkitbenchmarker.configs import benchmark_config_spec
from perfkitbenchmarker.linux_benchmarks import cluster_boot_benchmark
from perfkitbenchmarker.linux_benchmarks import cuda_memcpy_benchmark
from perfkitbenchmarker.linux_packages import build_tools
from perfkitbenchmarker.publisher import SampleCollector
import six
from six.moves import zip

LOG_FILE_NAME = 'pkb.log'
COMPLETION_STATUS_FILE_NAME = 'completion_statuses.json'
REQUIRED_INFO = ['scratch_disk', 'num_machines']
REQUIRED_EXECUTABLES = frozenset(['ssh', 'ssh-keygen', 'scp', 'openssl'])
MAX_RUN_URI_LENGTH = 12
FLAGS = flags.FLAGS

# Define patterns for help text processing.
BASE_RELATIVE = '../'  # Relative path from markdown output to PKB home for link writing.
MODULE_REGEX = r'^\s+?(.*?):.*'  # Pattern that matches module names.
FLAGS_REGEX = r'(^\s\s--.*?(?=^\s\s--|\Z))+?'  # Pattern that matches each flag.
FLAGNAME_REGEX = r'^\s+?(--.*?)(:.*\Z)'  # Pattern that matches flag name in each flag.
DOCSTRING_REGEX = r'"""(.*?|$)"""'  # Pattern that matches triple quoted comments.

flags.DEFINE_list('ssh_options', [], 'Additional options to pass to ssh.')
flags.DEFINE_boolean('use_ipv6', False, 'Whether to use ipv6 for ssh/scp.')
flags.DEFINE_list('benchmarks', [benchmark_sets.STANDARD_SET],
                  'Benchmarks and/or benchmark sets that should be run. The '
                  'default is the standard set. For more information about '
                  'benchmarks and benchmark sets, see the README and '
                  'benchmark_sets.py.')
flags.DEFINE_boolean('multi_os_benchmark', False, 'Whether is benchmark will '
                     'involve multiple os types.')
flags.DEFINE_string('archive_bucket', None,
                    'Archive results to the given S3/GCS bucket.')
flags.DEFINE_string('project', None, 'GCP project ID under which '
                    'to create the virtual machines')
flags.DEFINE_multi_string(
    'zone', [],
    'Similar to the --zones flag, but allows the flag to be specified '
    'multiple times on the commandline. For example, --zone=a --zone=b is '
    'equivalent to --zones=a,b. Furthermore, any values specified by --zone '
    'will be appended to those specfied by --zones.')
flags.DEFINE_list(
    'zones', [],
    'A list of zones within which to run PerfKitBenchmarker. '
    'This is specific to the cloud provider you are running on. '
    'If multiple zones are given, PerfKitBenchmarker will create 1 VM in '
    'zone, until enough VMs are created as specified in each '
    'benchmark. The order in which this flag is applied to VMs is '
    'undefined.')
flags.DEFINE_list(
    'extra_zones', [],
    'Zones that will be appended to the "zones" list. This is functionally '
    'the same, but allows flag matrices to have two zone axes.')
# TODO(user): note that this is currently very GCE specific. Need to create a
#    module which can translate from some generic types to provider specific
#    nomenclature.
flags.DEFINE_string('machine_type', None, 'Machine '
                    'types that will be created for benchmarks that don\'t '
                    'require a particular type.')
flags.DEFINE_integer('num_vms', 1, 'For benchmarks which can make use of a '
                     'variable number of machines, the number of VMs to use.')
flags.DEFINE_string('image', None, 'Default image that will be '
                    'linked to the VM')
flags.DEFINE_string('run_uri', None, 'Name of the Run. If provided, this '
                    'should be alphanumeric and less than or equal to %d '
                    'characters in length.' % MAX_RUN_URI_LENGTH)
flags.DEFINE_boolean('use_pkb_logging', True, 'Whether to use PKB-specific '
                     'logging handlers. Disabling this will use the standard '
                     'ABSL logging directly.')
flags.DEFINE_boolean('log_dmesg', False, 'Whether to log dmesg from '
                     'each VM to the PKB log file before the VM is deleted.')
flags.DEFINE_boolean('always_teardown_on_exception', False, 'Whether to tear '
                     'down VMs when there is exception during the PKB run. If'
                     'enabled, VMs will be torn down even if FLAGS.run_stage '
                     'does not specify teardown.')
_RESTORE_PATH = flags.DEFINE_string('restore', None,
                                    'Path to restore resources from.')
_FREEZE_PATH = flags.DEFINE_string('freeze', None,
                                   'Path to freeze resources to.')
_COLLECT_MEMINFO = flags.DEFINE_bool('collect_meminfo', False,
                                     'Whether to collect /proc/meminfo stats.')


def GetCurrentUser():
  """Get the current user name.

  On some systems the current user information may be unavailable. In these
  cases we just need a string to tag the created resources with. It should
  not be a fatal error.

  Returns:
    User name OR default string if user name not available.
  """
  try:
    return getpass.getuser()
  except KeyError:
    return 'user_unknown'


flags.DEFINE_string(
    'owner', GetCurrentUser(), 'Owner name. '
    'Used to tag created resources and performance records.')
flags.DEFINE_enum(
    'log_level', log_util.INFO,
    list(log_util.LOG_LEVELS.keys()),
    'The log level to run at.')
flags.DEFINE_enum(
    'file_log_level', log_util.DEBUG, list(log_util.LOG_LEVELS.keys()),
    'Anything logged at this level or higher will be written to the log file.')
flags.DEFINE_integer('duration_in_seconds', None,
                     'duration of benchmarks. '
                     '(only valid for mesh_benchmark)')
flags.DEFINE_string('static_vm_file', None,
                    'The file path for the Static Machine file. See '
                    'static_virtual_machine.py for a description of this file.')
flags.DEFINE_boolean('version', False, 'Display the version and exit.',
                     allow_override_cpp=True)
flags.DEFINE_boolean('time_commands', False, 'Times each command issued.')
flags.DEFINE_enum(
    'scratch_disk_type', None,
    [disk.STANDARD, disk.REMOTE_SSD, disk.PIOPS, disk.LOCAL],
    'Type for all scratch disks. The default is standard')
flags.DEFINE_string(
    'data_disk_type', None,
    'Type for all data disks. If a provider keeps the operating system and '
    'user data on separate disks, this only affects the user data disk(s).'
    'If the provider has OS and user data on the same disk, this flag affects'
    'that disk.')
flags.DEFINE_integer('scratch_disk_size', None, 'Size, in gb, for all scratch '
                     'disks.')
flags.DEFINE_list(
    'data_disk_zones', [],
    'The zone of the data disk. This is only used to provision regional pd with'
    ' multiple zones on GCP.'
    )
flags.DEFINE_integer('data_disk_size', None, 'Size, in gb, for all data disks.')
flags.DEFINE_integer('scratch_disk_iops', None,
                     'IOPS for Provisioned IOPS (SSD) volumes in AWS.')
flags.DEFINE_integer('scratch_disk_throughput', None,
                     'Throughput (MB/s) for volumes in AWS.')
flags.DEFINE_integer('num_striped_disks', None,
                     'The number of data disks to stripe together to form one '
                     '"logical" data disk. This defaults to 1 '
                     '(except with local disks), which means no striping. '
                     'When using local disks, they default to striping '
                     'all disks together. The striped disks will appear as '
                     'one disk (data_disk_0) in the metadata.',
                     lower_bound=1)
flags.DEFINE_bool('install_packages', None,
                  'Override for determining whether packages should be '
                  'installed. If this is false, no packages will be installed '
                  'on any VMs. This option should probably only ever be used '
                  'if you have already created an image with all relevant '
                  'packages installed.')
flags.DEFINE_bool(
    'stop_after_benchmark_failure', False,
    'Determines response when running multiple benchmarks serially and a '
    'benchmark run fails. When True, no further benchmarks are scheduled, and '
    'execution ends. When False, benchmarks continue to be scheduled. Does not '
    'apply to keyboard interrupts, which will always prevent further '
    'benchmarks from being scheduled.')
flags.DEFINE_boolean(
    'ignore_package_requirements', False,
    'Disables Python package requirement runtime checks.')
flags.DEFINE_enum('spark_service_type', None,
                  [spark_service.PKB_MANAGED, spark_service.PROVIDER_MANAGED],
                  'Type of spark service to use')
flags.DEFINE_boolean(
    'publish_after_run', False,
    'If true, PKB will publish all samples available immediately after running '
    'each benchmark. This may be useful in scenarios where the PKB run time '
    'for all benchmarks is much greater than a single benchmark.')
flags.DEFINE_integer(
    'publish_period', None,
    'The period in seconds to publish samples from repeated run stages. '
    'This will only publish samples if publish_after_run is True.')
flags.DEFINE_integer(
    'run_stage_time', 0,
    'PKB will run/re-run the run stage of each benchmark until it has spent '
    'at least this many seconds. It defaults to 0, so benchmarks will only '
    'be run once unless some other value is specified. This flag and '
    'run_stage_iterations are mutually exclusive.')
flags.DEFINE_integer(
    'run_stage_iterations', 1,
    'PKB will run/re-run the run stage of each benchmark this many times. '
    'It defaults to 1, so benchmarks will only be run once unless some other '
    'value is specified. This flag and run_stage_time are mutually exclusive.')
flags.DEFINE_integer(
    'run_stage_retries', 0,
    'The number of allowable consecutive failures during the run stage. After '
    'this number of failures any exceptions will cause benchmark termination. '
    'If run_stage_time is exceeded, the run stage will not be retried even if '
    'the number of failures is less than the value of this flag.')
_MAX_RETRIES = flags.DEFINE_integer(
    'retries', 0, 'The amount of times PKB should retry each benchmark.'
    'Use with --retry_substatuses to specify which failure substatuses to '
    'retry on. Defaults to all valid substatuses.')
_RETRY_SUBSTATUSES = flags.DEFINE_multi_enum(
    'retry_substatuses', benchmark_status.FailedSubstatus.RETRYABLE_SUBSTATUSES,
    benchmark_status.FailedSubstatus.RETRYABLE_SUBSTATUSES,
    'The failure substatuses to retry on. By default, failed runs are run with '
    'the same previous config.')
_RETRY_DELAY_SECONDS = flags.DEFINE_integer(
    'retry_delay_seconds', 0, 'The time to wait in between retries.')
_SMART_QUOTA_RETRY = flags.DEFINE_bool(
    'smart_quota_retry', False,
    'If True, causes the benchmark to rerun in a zone in a different region '
    'in the same geo on a quota exception. Currently only works for benchmarks '
    'that specify a single zone (via --zone or --zones). The zone is selected '
    'at random and overrides the --zones flag or the --zone flag, depending on '
    'which is provided. QUOTA_EXCEEDED must be in the list of retry '
    'substatuses for this to work.')
_SMART_CAPACITY_RETRY = flags.DEFINE_bool(
    'smart_capacity_retry', False,
    'If True, causes the benchmark to rerun in a different zone in the same '
    'region on a capacity exception. Currently only works for benchmarks '
    'that specify a single zone (via --zone or --zones). The zone is selected '
    'at random and overrides the --zones flag or the --zone flag, depending on '
    'which is provided. INSUFFICIENT_CAPACITY must be in the list of retry '
    'substatuses for this to work.')
flags.DEFINE_boolean(
    'boot_samples', False,
    'Whether to publish boot time samples for all tests.')
_MEASURE_DELETE = flags.DEFINE_boolean(
    'delete_samples', False,
    'Whether to publish delete time samples for all tests.')
flags.DEFINE_boolean(
    'gpu_samples', False,
    'Whether to publish GPU memcpy bandwidth samples for GPU tests.')
flags.DEFINE_integer(
    'run_processes', None,
    'The number of parallel processes to use to run benchmarks.',
    lower_bound=1)
flags.DEFINE_float(
    'run_processes_delay', None,
    'The delay in seconds between parallel processes\' invocation. '
    'Increasing this value may reduce provider throttling issues.',
    lower_bound=0)
flags.DEFINE_string(
    'completion_status_file', None,
    'If specified, this file will contain the completion status of each '
    'benchmark that ran (SUCCEEDED, FAILED, or SKIPPED). The file has one json '
    'object per line, each with the following format:\n'
    '{ "name": <benchmark name>, "flags": <flags dictionary>, '
    '"status": <completion status> }')
flags.DEFINE_string(
    'helpmatch', '',
    'Shows only flags defined in a module whose name matches the given regex.',
    allow_override_cpp=True)
flags.DEFINE_string(
    'helpmatchmd', '',
    'helpmatch query with markdown friendly output. '
    'Shows only flags defined in a module whose name matches the given regex.',
    allow_override_cpp=True)
flags.DEFINE_boolean(
    'create_failed_run_samples', False,
    'If true, PKB will create a sample specifying that a run stage failed. '
    'This sample will include metadata specifying the run stage that '
    'failed, the exception that occurred, as well as all the flags that '
    'were provided to PKB on the command line.')
flags.DEFINE_boolean(
    'create_started_run_sample', False,
    'Whether PKB will create a sample at the start of the provision phase of '
    'the benchmark run.')
flags.DEFINE_integer(
    'failed_run_samples_error_length', 10240,
    'If create_failed_run_samples is true, PKB will truncate any error '
    'messages at failed_run_samples_error_length.')
flags.DEFINE_boolean(
    'dry_run', False,
    'If true, PKB will print the flags configurations to be run and exit. '
    'The configurations are generated from the command line flags, the '
    'flag_matrix, and flag_zip.')
flags.DEFINE_string(
    'skip_pending_runs_file', None,
    'If file exists, any pending runs will be not be executed.')
flags.DEFINE_boolean(
    'use_vpn', False,
    'Creates VPN tunnels between vm_groups')
flags.DEFINE_integer(
    'after_prepare_sleep_time', 0,
    'The time in seconds to sleep after the prepare phase. This can be useful '
    'for letting burst tokens accumulate.')
flags.DEFINE_integer(
    'after_run_sleep_time', 0,
    'The time in seconds to sleep after the run phase. This can be useful '
    'for letting the VM sit idle after the bechmarking phase is complete.')
flags.DEFINE_bool(
    'before_run_pause', False,
    'If true, wait for command line input before executing the run phase. '
    'This is useful for debugging benchmarks during development.')
flags.DEFINE_bool(
    'before_cleanup_pause', False,
    'If true, wait for command line input before executing the cleanup phase. '
    'This is useful for debugging benchmarks during development.')
flags.DEFINE_integer(
    'timeout_minutes', 240,
    'An upper bound on the time in minutes that the benchmark is expected to '
    'run. This time is annotated or tagged on the resources of cloud '
    'providers. Note that for retries, this applies to each individual retry.')
flags.DEFINE_integer(
    'persistent_timeout_minutes', 240,
    'An upper bound on the time in minutes that resources left behind by the '
    'benchmark. Some benchmarks purposefully create resources for other '
    'benchmarks to use. Persistent timeout specifies how long these shared '
    'resources should live.')
flags.DEFINE_bool('disable_interrupt_moderation', False,
                  'Turn off the interrupt moderation networking feature')
flags.DEFINE_bool('disable_rss', False,
                  'Whether or not to disable the Receive Side Scaling feature.')
flags.DEFINE_boolean('record_lscpu', True,
                     'Whether to record the lscpu output in a sample')
flags.DEFINE_boolean('record_proccpu', True,
                     'Whether to record the /proc/cpuinfo output in a sample')
flags.DEFINE_boolean('record_cpu_vuln', True,
                     'Whether to record the CPU vulnerabilities on linux VMs')
flags.DEFINE_boolean('record_gcc', True,
                     'Whether to record the gcc version in a sample')
flags.DEFINE_boolean('record_glibc', True,
                     'Whether to record the glibc version in a sample')
# Support for using a proxy in the cloud environment.
flags.DEFINE_string('http_proxy', '',
                    'Specify a proxy for HTTP in the form '
                    '[user:passwd@]proxy.server:port.')
flags.DEFINE_string('https_proxy', '',
                    'Specify a proxy for HTTPS in the form '
                    '[user:passwd@]proxy.server:port.')
flags.DEFINE_string('ftp_proxy', '',
                    'Specify a proxy for FTP in the form '
                    '[user:passwd@]proxy.server:port.')
flags.DEFINE_string('no_proxy', '',
                    'Specify host(s) to exclude from proxy, e.g. '
                    '--no_proxy=localhost,.example.com,192.168.0.1')
flags.DEFINE_bool('randomize_run_order', False,
                  'When running with more than one benchmarks, '
                  'randomize order of the benchmarks.')

_TEARDOWN_EVENT = multiprocessing.Event()

events.initialization_complete.connect(traces.RegisterAll)


@flags.multi_flags_validator(
    ['smart_quota_retry', 'smart_capacity_retry', 'retries', 'zones', 'zone'],
    message='Smart zone retries requires exactly one single zone from --zones '
    'or --zone, as well as retry count > 0.')
def ValidateSmartZoneRetryFlags(flags_dict):
  """Validates smart zone retry flags."""
  if flags_dict['smart_quota_retry'] or flags_dict['smart_capacity_retry']:
    if flags_dict['retries'] == 0:
      return False
    return (len(flags_dict['zones']) == 1 and
            not flags_dict['zone']) or (len(flags_dict['zone']) == 1 and
                                        not flags_dict['zones'])
  return True


@flags.multi_flags_validator(
    ['retries', 'run_stage'],
    message='Retries requires running all stages of the benchmark.')
def ValidateRetriesAndRunStages(flags_dict):
  if flags_dict['retries'] > 0 and flags_dict['run_stage'] != stages.STAGES:
    return False
  return True


def _InjectBenchmarkInfoIntoDocumentation():
  """Appends each benchmark's information to the main module's docstring."""
  # TODO: Verify if there is other way of appending additional help
  # message.
  # Inject more help documentation
  # The following appends descriptions of the benchmarks and descriptions of
  # the benchmark sets to the help text.
  benchmark_sets_list = [
      '%s:  %s' %
      (set_name, benchmark_sets.BENCHMARK_SETS[set_name]['message'])
      for set_name in benchmark_sets.BENCHMARK_SETS]
  sys.modules['__main__'].__doc__ = (
      'PerfKitBenchmarker version: {version}\n\n{doc}\n'
      'Benchmarks (default requirements):\n'
      '\t{benchmark_doc}').format(
          version=version.VERSION,
          doc=__doc__,
          benchmark_doc=_GenerateBenchmarkDocumentation())
  sys.modules['__main__'].__doc__ += ('\n\nBenchmark Sets:\n\t%s'
                                      % '\n\t'.join(benchmark_sets_list))


def _ParseFlags(argv=sys.argv):
  """Parses the command-line flags."""
  try:
    argv = FLAGS(argv)
  except flags.Error as e:
    logging.error(e)
    logging.info('For usage instructions, use --helpmatch={module_name}')
    logging.info('For example, ./pkb.py --helpmatch=benchmarks.fio')
    sys.exit(1)


def _PrintHelp(matches=None):
  """Prints help for flags defined in matching modules.

  Args:
    matches: regex string or None. Filters help to only those whose name
      matched the regex. If None then all flags are printed.
  """
  if not matches:
    print(FLAGS)
  else:
    flags_by_module = FLAGS.flags_by_module_dict()
    modules = sorted(flags_by_module)
    regex = re.compile(matches)
    for module_name in modules:
      if regex.search(module_name):
        print(FLAGS.module_help(module_name))


def _PrintHelpMD(matches=None):
  """Prints markdown formatted help for flags defined in matching modules.

  Works just like --helpmatch.

  Args:
    matches: regex string or None. Filters help to only those whose name matched
      the regex. If None then all flags are printed.
  Eg:
  * all flags: `./pkb.py --helpmatchmd .*`  > testsuite_docs/all.md
  * linux benchmarks: `./pkb.py --helpmatchmd linux_benchmarks.*`  >
    testsuite_docs/linux_benchmarks.md  * specific modules `./pkb.py
    --helpmatchmd iperf`  > testsuite_docs/iperf.md  * windows packages
    `./pkb.py --helpmatchmd windows_packages.*`  >
    testsuite_docs/windows_packages.md
  * GCP provider: `./pkb.py --helpmatchmd providers.gcp.* >
    testsuite_docs/providers_gcp.md`
  """

  flags_by_module = FLAGS.flags_by_module_dict()
  modules = sorted(flags_by_module)
  regex = re.compile(matches)
  for module_name in modules:
    if regex.search(module_name):
      # Compile regex patterns.
      module_regex = re.compile(MODULE_REGEX)
      flags_regex = re.compile(FLAGS_REGEX, re.MULTILINE | re.DOTALL)
      flagname_regex = re.compile(FLAGNAME_REGEX, re.MULTILINE | re.DOTALL)
      docstring_regex = re.compile(DOCSTRING_REGEX, re.MULTILINE | re.DOTALL)
      # Retrieve the helpmatch text to format.
      helptext_raw = FLAGS.module_help(module_name)

      # Converts module name to github linkable string.
      # eg: perfkitbenchmarker.linux_benchmarks.iperf_vpn_benchmark ->
      # perfkitbenchmarker/linux_benchmarks/iperf_vpn_benchmark.py
      module = re.search(
          module_regex,
          helptext_raw,
      ).group(1)
      module_link = module.replace('.', '/') + '.py'
      # Put flag name in a markdown code block for visibility.
      flags = re.findall(flags_regex, helptext_raw)
      flags[:] = [flagname_regex.sub(r'`\1`\2', flag) for flag in flags]
      # Get the docstring for the module without importing everything into our
      # namespace. Probably a better way to do this
      docstring = 'No description available'
      # Only pull doststrings from inside pkb source files.
      if isfile(module_link):
        with open(module_link, 'r') as f:
          source = f.read()
          # Get the triple quoted matches.
          docstring_match = re.search(docstring_regex, source)
          # Some modules don't have docstrings.
          # eg perfkitbenchmarker/providers/alicloud/flags.py
          if docstring_match is not None:
            docstring = docstring_match.group(1)
      # Format output and print here.
      if isfile(module_link):  # Only print links for modules we can find.
        print('### [' + module, '](' + BASE_RELATIVE + module_link + ')\n')
      else:
        print('### ' + module + '\n')
      print('#### Description:\n\n' + docstring + '\n\n#### Flags:\n')
      print('\n'.join(flags) + '\n')


def CheckVersionFlag():
  """If the --version flag was specified, prints the version and exits."""
  if FLAGS.version:
    print(version.VERSION)
    sys.exit(0)


def _InitializeRunUri():
  """Determines the PKB run URI and sets FLAGS.run_uri."""
  if FLAGS.run_uri is None:
    if stages.PROVISION in FLAGS.run_stage:
      FLAGS.run_uri = str(uuid.uuid4())[-8:]
    else:
      # Attempt to get the last modified run directory.
      run_uri = vm_util.GetLastRunUri()
      if run_uri:
        FLAGS.run_uri = run_uri
        logging.warning(
            'No run_uri specified. Attempting to run the following stages with '
            '--run_uri=%s: %s', FLAGS.run_uri, ', '.join(FLAGS.run_stage))
      else:
        raise errors.Setup.NoRunURIError(
            'No run_uri specified. Could not run the following stages: %s' %
            ', '.join(FLAGS.run_stage))
  elif not FLAGS.run_uri.isalnum() or len(FLAGS.run_uri) > MAX_RUN_URI_LENGTH:
    raise errors.Setup.BadRunURIError('run_uri must be alphanumeric and less '
                                      'than or equal to %d characters in '
                                      'length.' % MAX_RUN_URI_LENGTH)


def _CreateBenchmarkSpecs():
  """Create a list of BenchmarkSpecs for each benchmark run to be scheduled.

  Returns:
    A list of BenchmarkSpecs.
  """
  specs = []
  benchmark_tuple_list = benchmark_sets.GetBenchmarksFromFlags()
  benchmark_counts = collections.defaultdict(itertools.count)
  for benchmark_module, user_config in benchmark_tuple_list:
    # Construct benchmark config object.
    name = benchmark_module.BENCHMARK_NAME
    expected_os_types = None if FLAGS.multi_os_benchmark else (
        os_types.WINDOWS_OS_TYPES if FLAGS.os_type in os_types.WINDOWS_OS_TYPES
        else os_types.LINUX_OS_TYPES)
    with flag_util.OverrideFlags(FLAGS, user_config.get('flags')):
      config_dict = benchmark_module.GetConfig(user_config)
    config_spec_class = getattr(
        benchmark_module, 'BENCHMARK_CONFIG_SPEC_CLASS',
        benchmark_config_spec.BenchmarkConfigSpec)
    config = config_spec_class(name, expected_os_types=expected_os_types,
                               flag_values=FLAGS, **config_dict)

    # Assign a unique ID to each benchmark run. This differs even between two
    # runs of the same benchmark within a single PKB run.
    uid = name + str(next(benchmark_counts[name]))

    # Optional step to check flag values and verify files exist.
    check_prereqs = getattr(benchmark_module, 'CheckPrerequisites', None)
    if check_prereqs:
      try:
        with config.RedirectFlags(FLAGS):
          check_prereqs(config)
      except:
        logging.exception('Prerequisite check failed for %s', name)
        raise

    specs.append(
        bm_spec.BenchmarkSpec.GetBenchmarkSpec(benchmark_module, config, uid))

  return specs


def _WriteCompletionStatusFile(benchmark_specs, status_file):
  """Writes a completion status file.

  The file has one json object per line, each with the following format:

  {
    "name": <benchmark name>,
    "status": <completion status>,
    "failed_substatus": <failed substatus>,
    "status_detail": <descriptive string (if present)>,
    "flags": <flags dictionary>
  }

  Args:
    benchmark_specs: The list of BenchmarkSpecs that ran.
    status_file: The file object to write the json structures to.
  """
  for spec in benchmark_specs:
    # OrderedDict so that we preserve key order in json file
    status_dict = collections.OrderedDict()
    status_dict['name'] = spec.name
    status_dict['status'] = spec.status
    if spec.failed_substatus:
      status_dict['failed_substatus'] = spec.failed_substatus
    if spec.status_detail:
      status_dict['status_detail'] = spec.status_detail
    status_dict['flags'] = spec.config.flags
    status_file.write(json.dumps(status_dict) + '\n')


def _SetRestoreSpec(spec: bm_spec.BenchmarkSpec) -> None:
  """Unpickles the spec to restore resources from, if provided."""
  restore_path = _RESTORE_PATH.value
  if restore_path:
    logging.info('Using restore spec at path: %s', restore_path)
    with open(restore_path, 'rb') as spec_file:
      spec.restore_spec = pickle.load(spec_file)


def _SetFreezePath(spec: bm_spec.BenchmarkSpec) -> None:
  """Sets the path to freeze resources to if provided."""
  if _FREEZE_PATH.value:
    spec.freeze_path = _FREEZE_PATH.value
    logging.info('Using freeze path, %s', spec.freeze_path)


def DoProvisionPhase(spec, timer):
  """Performs the Provision phase of benchmark execution.

  Args:
    spec: The BenchmarkSpec created for the benchmark.
    timer: An IntervalTimer that measures the start and stop times of resource
      provisioning.
  """
  if FLAGS.create_started_run_sample:
    PublishRunStartedSample(spec)
  logging.info('Provisioning resources for benchmark %s', spec.name)
  spec.ConstructContainerCluster()
  spec.ConstructContainerRegistry()
  # spark service needs to go first, because it adds some vms.
  spec.ConstructSparkService()
  spec.ConstructDpbService()
  spec.ConstructVirtualMachines()
  spec.ConstructRelationalDb()
  spec.ConstructSpanner()
  spec.ConstructNonRelationalDb()
  # CapacityReservations need to be constructed after VirtualMachines because
  # it needs information about the VMs (machine type, count, zone, etc). The
  # CapacityReservations will be provisioned before VMs.
  spec.ConstructCapacityReservations()
  spec.ConstructTpu()
  spec.ConstructEdwService()
  spec.ConstructVPNService()
  spec.ConstructNfsService()
  spec.ConstructSmbService()
  # Pickle the spec before we try to create anything so we can clean
  # everything up on a second run if something goes wrong.
  spec.Pickle()
  events.benchmark_start.send(benchmark_spec=spec)
  try:
    with timer.Measure('Resource Provisioning'):
      spec.Provision()
  finally:
    # Also pickle the spec after the resources are created so that
    # we have a record of things like AWS ids. Otherwise we won't
    # be able to clean them up on a subsequent run.
    spec.Pickle()


class InterruptChecker():
  """An class that check interrupt on VM."""

  def __init__(self, vms):
    """Start check interrupt thread.

    Args:
      vms: A list of virtual machines.
    """
    self.vms = vms
    self.check_threads = []
    self.phase_status = threading.Event()
    for vm in vms:
      if vm.IsInterruptible():
        check_thread = threading.Thread(target=self.CheckInterrupt, args=(vm,))
        check_thread.start()
        self.check_threads.append(check_thread)

  def CheckInterrupt(self, vm):
    """Check interrupt.

    Args:
      vm: the virtual machine object.

    Returns:
      None
    """
    while not self.phase_status.isSet():
      vm.UpdateInterruptibleVmStatus(use_api=False)
      if vm.WasInterrupted():
        return
      else:
        self.phase_status.wait(vm.GetInterruptableStatusPollSeconds())

  def EndCheckInterruptThread(self):
    """End check interrupt thread."""
    self.phase_status.set()

    for check_thread in self.check_threads:
      check_thread.join()

  def EndCheckInterruptThreadAndRaiseError(self):
    """End check interrupt thread and raise error.

    Raises:
      InsufficientCapacityCloudFailure when it catches interrupt.

    Returns:
      None
    """
    self.EndCheckInterruptThread()
    if any(vm.IsInterruptible() and vm.WasInterrupted() for vm in self.vms):
      raise errors.Benchmarks.InsufficientCapacityCloudFailure('Interrupt')


def DoPreparePhase(spec, timer):
  """Performs the Prepare phase of benchmark execution.

  Args:
    spec: The BenchmarkSpec created for the benchmark.
    timer: An IntervalTimer that measures the start and stop times of the
      benchmark module's Prepare function.
  """
  logging.info('Preparing benchmark %s', spec.name)
  with timer.Measure('BenchmarkSpec Prepare'):
    spec.Prepare()
  with timer.Measure('Benchmark Prepare'):
    spec.BenchmarkPrepare(spec)
  spec.StartBackgroundWorkload()
  if FLAGS.after_prepare_sleep_time:
    logging.info('Sleeping for %s seconds after the prepare phase.',
                 FLAGS.after_prepare_sleep_time)
    time.sleep(FLAGS.after_prepare_sleep_time)


def DoRunPhase(spec, collector, timer):
  """Performs the Run phase of benchmark execution.

  Args:
    spec: The BenchmarkSpec created for the benchmark.
    collector: The SampleCollector object to add samples to.
    timer: An IntervalTimer that measures the start and stop times of the
      benchmark module's Run function.
  """
  if FLAGS.before_run_pause:
    six.moves.input('Hit enter to begin Run.')
  deadline = time.time() + FLAGS.run_stage_time
  run_number = 0
  consecutive_failures = 0
  last_publish_time = time.time()

  def _IsRunStageFinished():
    if FLAGS.run_stage_time > 0:
      return time.time() > deadline
    else:
      return run_number >= FLAGS.run_stage_iterations

  while True:
    samples = []
    logging.info('Running benchmark %s', spec.name)
    events.before_phase.send(events.RUN_PHASE, benchmark_spec=spec)
    try:
      with timer.Measure('Benchmark Run'):
        samples = spec.BenchmarkRun(spec)
    except Exception:
      consecutive_failures += 1
      if consecutive_failures > FLAGS.run_stage_retries:
        raise
      logging.exception('Run failed (consecutive_failures=%s); retrying.',
                        consecutive_failures)
    else:
      consecutive_failures = 0
    finally:
      events.after_phase.send(events.RUN_PHASE, benchmark_spec=spec)
    if FLAGS.run_stage_time or FLAGS.run_stage_iterations:
      for s in samples:
        s.metadata['run_number'] = run_number

    # Add boot time metrics on the first run iteration.
    if run_number == 0 and (FLAGS.boot_samples or
                            spec.name == cluster_boot_benchmark.BENCHMARK_NAME):
      samples.extend(cluster_boot_benchmark.GetTimeToBoot(spec.vms))

    # In order to collect GPU samples one of the VMs must have both an Nvidia
    # GPU and the nvidia-smi
    if FLAGS.gpu_samples:
      samples.extend(cuda_memcpy_benchmark.Run(spec))

    if FLAGS.record_lscpu:
      samples.extend(_CreateLscpuSamples(spec.vms))

    if FLAGS.record_proccpu:
      samples.extend(_CreateProcCpuSamples(spec.vms))
    if FLAGS.record_cpu_vuln and run_number == 0:
      samples.extend(_CreateCpuVulnerabilitySamples(spec.vms))

    if FLAGS.record_gcc:
      samples.extend(_CreateGccSamples(spec.vms))
    if FLAGS.record_glibc:
      samples.extend(_CreateGlibcSamples(spec.vms))

    events.samples_created.send(
        events.RUN_PHASE, benchmark_spec=spec, samples=samples)
    collector.AddSamples(samples, spec.name, spec)
    if (FLAGS.publish_after_run and FLAGS.publish_period is not None and
        FLAGS.publish_period < (time.time() - last_publish_time)):
      collector.PublishSamples()
      last_publish_time = time.time()
    run_number += 1
    if _IsRunStageFinished():
      if FLAGS.after_run_sleep_time:
        logging.info('Sleeping for %s seconds after the run phase.',
                     FLAGS.after_run_sleep_time)
        time.sleep(FLAGS.after_run_sleep_time)
      break


def DoCleanupPhase(spec, timer):
  """Performs the Cleanup phase of benchmark execution.

  Cleanup phase work should be delegated to spec.BenchmarkCleanup to allow
  non-PKB based cleanup if needed.

  Args:
    spec: The BenchmarkSpec created for the benchmark.
    timer: An IntervalTimer that measures the start and stop times of the
      benchmark module's Cleanup function.
  """
  if FLAGS.before_cleanup_pause:
    six.moves.input('Hit enter to begin Cleanup.')
  logging.info('Cleaning up benchmark %s', spec.name)
  if (spec.always_call_cleanup or any([vm.is_static for vm in spec.vms]) or
      spec.dpb_service is not None):
    spec.StopBackgroundWorkload()
    with timer.Measure('Benchmark Cleanup'):
      spec.BenchmarkCleanup(spec)


def DoTeardownPhase(spec, collector, timer):
  """Performs the Teardown phase of benchmark execution.

  Teardown phase work should be delegated to spec.Delete to allow non-PKB based
  teardown if needed.

  Args:
    spec: The BenchmarkSpec created for the benchmark.
    collector: The SampleCollector object to add samples to
      (if collecting delete samples)
    timer: An IntervalTimer that measures the start and stop times of
      resource teardown.
  """
  logging.info('Tearing down resources for benchmark %s', spec.name)
  # Add delete time metrics after metadeta collected
  if _MEASURE_DELETE.value:
    samples = cluster_boot_benchmark.MeasureDelete(spec.vms)
    collector.AddSamples(samples, spec.name, spec)

  spec.Freeze()

  with timer.Measure('Resource Teardown'):
    spec.Delete()


def _SkipPendingRunsFile():
  if FLAGS.skip_pending_runs_file and isfile(FLAGS.skip_pending_runs_file):
    logging.warning('%s exists.  Skipping benchmark.',
                    FLAGS.skip_pending_runs_file)
    return True
  else:
    return False

_SKIP_PENDING_RUNS_CHECKS = []


def RegisterSkipPendingRunsCheck(func):
  """Registers a function to skip pending runs.

  Args:
    func: A function which returns True if pending runs should be skipped.
  """
  _SKIP_PENDING_RUNS_CHECKS.append(func)


def PublishRunStartedSample(spec):
  """Publishes a sample indicating that a run has started.

  This sample is published immediately so that there exists some metric for any
  run (even if the process dies).

  Args:
    spec: The BenchmarkSpec object with run information.
  """
  collector = SampleCollector()
  metadata = {
      'flags': str(flag_util.GetProvidedCommandLineFlags())
  }
  collector.AddSamples(
      [sample.Sample('Run Started', 1, 'Run Started', metadata)],
      spec.name, spec)
  collector.PublishSamples()


def RunBenchmark(spec, collector):
  """Runs a single benchmark and adds the results to the collector.

  Args:
    spec: The BenchmarkSpec object with run information.
    collector: The SampleCollector object to add samples to.
  """

  # Since there are issues with the handling SIGINT/KeyboardInterrupt (see
  # further discussion in _BackgroundProcessTaskManager) this mechanism is
  # provided for defense in depth to force skip pending runs after SIGINT.
  for f in _SKIP_PENDING_RUNS_CHECKS:
    if f():
      logging.warning('Skipping benchmark.')
      return

  spec.status = benchmark_status.FAILED
  current_run_stage = stages.PROVISION
  # Modify the logger prompt for messages logged within this function.
  label_extension = '{}({}/{})'.format(
      spec.name, spec.sequence_number, spec.total_benchmarks)
  context.SetThreadBenchmarkSpec(spec)
  log_context = log_util.GetThreadLogContext()
  with log_context.ExtendLabel(label_extension):
    with spec.RedirectGlobalFlags():
      end_to_end_timer = timing_util.IntervalTimer()
      detailed_timer = timing_util.IntervalTimer()
      interrupt_checker = None
      try:
        with end_to_end_timer.Measure('End to End'):

          _SetRestoreSpec(spec)
          _SetFreezePath(spec)

          if stages.PROVISION in FLAGS.run_stage:
            DoProvisionPhase(spec, detailed_timer)

          if stages.PREPARE in FLAGS.run_stage:
            current_run_stage = stages.PREPARE
            interrupt_checker = InterruptChecker(spec.vms)
            DoPreparePhase(spec, detailed_timer)
            interrupt_checker.EndCheckInterruptThreadAndRaiseError()
            interrupt_checker = None

          if stages.RUN in FLAGS.run_stage:
            current_run_stage = stages.RUN
            interrupt_checker = InterruptChecker(spec.vms)
            DoRunPhase(spec, collector, detailed_timer)
            interrupt_checker.EndCheckInterruptThreadAndRaiseError()
            interrupt_checker = None

          if stages.CLEANUP in FLAGS.run_stage:
            current_run_stage = stages.CLEANUP
            interrupt_checker = InterruptChecker(spec.vms)
            DoCleanupPhase(spec, detailed_timer)
            interrupt_checker.EndCheckInterruptThreadAndRaiseError()
            interrupt_checker = None

          if stages.TEARDOWN in FLAGS.run_stage:
            current_run_stage = stages.TEARDOWN
            DoTeardownPhase(spec, collector, detailed_timer)

        # Add timing samples.
        if (FLAGS.run_stage == stages.STAGES and
            timing_util.EndToEndRuntimeMeasurementEnabled()):
          collector.AddSamples(
              end_to_end_timer.GenerateSamples(), spec.name, spec)
        if timing_util.RuntimeMeasurementsEnabled():
          collector.AddSamples(
              detailed_timer.GenerateSamples(), spec.name, spec)

        # Add resource related samples.
        collector.AddSamples(spec.GetSamples(), spec.name, spec)
      # except block will clean up benchmark specific resources on exception. It
      # may also clean up generic resources based on
      # FLAGS.always_teardown_on_exception.
      except (Exception, KeyboardInterrupt) as e:
        # Log specific type of failure, if known
        # TODO(dlott) Move to exception chaining with Python3 support
        if (isinstance(e, errors.Benchmarks.InsufficientCapacityCloudFailure)
            or 'InsufficientCapacityCloudFailure' in str(e)):
          spec.failed_substatus = (
              benchmark_status.FailedSubstatus.INSUFFICIENT_CAPACITY)
        elif (isinstance(e, errors.Benchmarks.QuotaFailure)
              or 'QuotaFailure' in str(e)):
          spec.failed_substatus = benchmark_status.FailedSubstatus.QUOTA
        elif isinstance(e, errors.Benchmarks.KnownIntermittentError):
          spec.failed_substatus = (
              benchmark_status.FailedSubstatus.KNOWN_INTERMITTENT)
        else:
          spec.failed_substatus = (
              benchmark_status.FailedSubstatus.UNCATEGORIZED)
        spec.status_detail = str(e)

        # Resource cleanup (below) can take a long time. Log the error to give
        # immediate feedback, then re-throw.
        logging.exception('Error during benchmark %s', spec.name)
        if FLAGS.create_failed_run_samples:
          collector.AddSamples(MakeFailedRunSample(
              spec, str(e), current_run_stage), spec.name, spec)

        # If the particular benchmark requests us to always call cleanup, do it
        # here.
        if stages.CLEANUP in FLAGS.run_stage and spec.always_call_cleanup:
          DoCleanupPhase(spec, detailed_timer)

        if (FLAGS.always_teardown_on_exception and
            stages.TEARDOWN not in FLAGS.run_stage):
          # Note that if TEARDOWN is specified, it will happen below.
          DoTeardownPhase(spec, collector, detailed_timer)
        raise
      # finally block will only clean up generic resources if teardown is
      # included in FLAGS.run_stage.
      finally:
        if interrupt_checker:
          interrupt_checker.EndCheckInterruptThread()
        # Deleting resources should happen first so any errors with publishing
        # don't prevent teardown.
        if stages.TEARDOWN in FLAGS.run_stage:
          spec.Delete()
        if FLAGS.publish_after_run:
          collector.PublishSamples()
        events.benchmark_end.send(benchmark_spec=spec)
        # Pickle spec to save final resource state.
        spec.Pickle()
  spec.status = benchmark_status.SUCCEEDED


def MakeFailedRunSample(spec, error_message, run_stage_that_failed):
  """Create a sample.Sample representing a failed run stage.

  The sample metric will have the name 'Run Failed';
  the value will be 1 (has to be convertible to a float),
  and the unit will be 'Run Failed' (for lack of a better idea).

  The sample metadata will include the error message from the
  Exception, the run stage that failed, as well as all PKB
  command line flags that were passed in.

  Args:
    spec: benchmark_spec
    error_message: error message that was caught, resulting in the
      run stage failure.
    run_stage_that_failed: run stage that failed by raising an Exception

  Returns:
    a sample.Sample representing the run stage failure.
  """
  # Note: currently all provided PKB command line flags are included in the
  # metadata. We may want to only include flags specific to the benchmark that
  # failed. This can be acomplished using gflag's FlagsByModuleDict().
  metadata = {
      'error_message': error_message[0:FLAGS.failed_run_samples_error_length],
      'run_stage': run_stage_that_failed,
      'flags': str(flag_util.GetProvidedCommandLineFlags())
  }
  vm_util.RunThreaded(lambda vm: vm.UpdateInterruptibleVmStatus(use_api=True),
                      spec.vms)

  interruptible_vm_count = 0
  interrupted_vm_count = 0
  vm_status_codes = []
  for vm in spec.vms:
    if vm.IsInterruptible():
      interruptible_vm_count += 1
      if vm.WasInterrupted():
        interrupted_vm_count += 1
        spec.failed_substatus = (
            benchmark_status.FailedSubstatus.INTERRUPTED)
        status_code = vm.GetVmStatusCode()
        if status_code:
          vm_status_codes.append(status_code)

  if spec.failed_substatus:
    metadata['failed_substatus'] = spec.failed_substatus

  if interruptible_vm_count:
    metadata.update({'interruptible_vms': interruptible_vm_count,
                     'interrupted_vms': interrupted_vm_count,
                     'vm_status_codes': vm_status_codes})
  if interrupted_vm_count:
    logging.error(
        '%d interruptible VMs were interrupted in this failed PKB run.',
        interrupted_vm_count)
  return [sample.Sample('Run Failed', 1, 'Run Failed', metadata)]


def _ShouldRetry(spec: bm_spec.BenchmarkSpec) -> bool:
  """Returns whether the benchmark run should be retried."""
  return (spec.status == benchmark_status.FAILED and
          spec.failed_substatus in _RETRY_SUBSTATUSES.value)


def RunBenchmarkTask(
    spec: bm_spec.BenchmarkSpec
) -> Tuple[Sequence[bm_spec.BenchmarkSpec], List[sample.Sample]]:
  """Task that executes RunBenchmark.

  This is designed to be used with RunParallelProcesses. Note that
  for retries only the last run has its samples published.

  Arguments:
    spec: BenchmarkSpec. The spec to call RunBenchmark with.

  Returns:
    A BenchmarkSpec for each run iteration and a list of samples from the
    last run.
  """
  # Many providers name resources using run_uris. When running multiple
  # benchmarks in parallel, this causes name collisions on resources.
  # By modifying the run_uri, we avoid the collisions.
  if FLAGS.run_processes and FLAGS.run_processes > 1:
    spec.config.flags['run_uri'] = FLAGS.run_uri + str(spec.sequence_number)
    # Unset run_uri so the config value takes precedence.
    FLAGS['run_uri'].present = 0

  # Set the run count.
  max_run_count = 1 + _MAX_RETRIES.value

  # Useful format string for debugging.
  benchmark_info = (
      f'{spec.sequence_number}/{spec.total_benchmarks} '
      f'{spec.name} (UID: {spec.uid})'
  )

  zone_retry_manager = ZoneRetryManager()
  result_specs = []
  for current_run_count in range(max_run_count):
    # Attempt to return the most recent results.
    if _TEARDOWN_EVENT.is_set():
      if result_specs and collector:
        return result_specs, collector.samples
      return [spec], []

    run_start_msg = ('\n' + '-' * 85 + '\n' +
                     'Starting benchmark %s attempt %s of %s' + '\n' + '-' * 85)
    logging.info(run_start_msg, benchmark_info, current_run_count + 1,
                 max_run_count)
    collector = SampleCollector()
    # Make a new copy of the benchmark_spec for each run since currently a
    # benchmark spec isn't compatible with multiple runs. In particular, the
    # benchmark_spec doesn't correctly allow for a provision of resources
    # after tearing down.
    spec_for_run = copy.deepcopy(spec)
    result_specs.append(spec_for_run)
    try:
      RunBenchmark(spec_for_run, collector)
    except BaseException as e:  # pylint: disable=broad-except
      logging.exception('Exception running benchmark')
      msg = f'Benchmark {benchmark_info} failed.'
      if isinstance(e, KeyboardInterrupt) or FLAGS.stop_after_benchmark_failure:
        logging.error('%s Execution will not continue.', msg)
        _TEARDOWN_EVENT.set()
        break
      logging.error('%s Execution will continue.', msg)

    # Don't retry on the last run.
    if _ShouldRetry(spec_for_run) and current_run_count != max_run_count - 1:
      logging.info(
          'Benchmark should be retried. Waiting %s seconds before running.',
          _RETRY_DELAY_SECONDS.value)
      time.sleep(_RETRY_DELAY_SECONDS.value)

      # Handle smart retries if specified.
      zone_retry_manager.HandleSmartRetries(spec_for_run)

    else:
      logging.info(
          'Benchmark should not be retried. '
          'Finished %s runs of %s', current_run_count + 1, max_run_count)
      break

  # We need to return both the spec and samples so that we know
  # the status of the test and can publish any samples that
  # haven't yet been published.
  return result_specs, collector.samples


class ZoneRetryManager():
  """Encapsulates state and functions for zone retries.

  Attributes:
    original_zone: If specified, the original zone provided to the benchmark.
    zones_tried: Zones that have already been tried in previous runs.
  """

  def __init__(self):
    if not _SMART_CAPACITY_RETRY.value and not _SMART_QUOTA_RETRY.value:
      return
    self._zones_tried: Set[str] = set()
    self._utils: types.ModuleType = providers.LoadProviderUtils(FLAGS.cloud)
    self._SetOriginalZoneAndFlag()

  def _SetOriginalZoneAndFlag(self) -> None:
    """Records the flag name and zone value that the benchmark started with."""
    # This is guaranteed to set values due to flag validator.
    for zone_flag in ['zone', 'zones']:
      if FLAGS[zone_flag].value:
        self._original_zone = FLAGS[zone_flag].value[0]
        self._zone_flag = zone_flag

  def HandleSmartRetries(self, spec: bm_spec.BenchmarkSpec) -> None:
    """Handles smart zone retry flags if provided."""
    if (_SMART_QUOTA_RETRY.value and spec.failed_substatus
        == benchmark_status.FailedSubstatus.QUOTA):
      self._AssignZoneToNewRegion()
    elif (_SMART_CAPACITY_RETRY.value and spec.failed_substatus
          == benchmark_status.FailedSubstatus.INSUFFICIENT_CAPACITY):
      self._AssignNewZoneSameRegion()

  def _AssignZoneToNewRegion(self) -> None:
    """Changes zone to be a new zone in the same geo but different region."""
    region = self._utils.GetRegionFromZone(self._original_zone)
    geo = self._utils.GetGeoFromRegion(region)
    possible_zones = set()
    for new_region in self._utils.GetRegionsInGeo(geo):
      if new_region != region:
        zones = self._utils.GetZonesInRegion(new_region)
        possible_zones.update(zones)
    self._ChooseAndSetNewZone(possible_zones)

  def _AssignNewZoneSameRegion(self) -> None:
    """Changes zone to be a new zone in the same region."""
    region = self._utils.GetRegionFromZone(self._original_zone)
    possible_zones = self._utils.GetZonesInRegion(region)
    self._ChooseAndSetNewZone(possible_zones)

  def _ChooseAndSetNewZone(self, possible_zones: Set[str]) -> None:
    """Saves the current _zone_flag and sets it to a new zone.

    Args:
      possible_zones: The set of zones to choose from.
    """
    current_zone = FLAGS[self._zone_flag].value[0]
    self._zones_tried.add(current_zone)
    zones_to_try = possible_zones - self._zones_tried
    # Restart from empty if we've exhausted all alternatives.
    if not zones_to_try:
      self._zones_tried.clear()
      new_zone = self._original_zone
    else:
      new_zone = random.choice(tuple(zones_to_try))
    logging.info('Retry using new zone %s', new_zone)
    FLAGS[self._zone_flag].unparse()
    FLAGS[self._zone_flag].parse([new_zone])


def _LogCommandLineFlags():
  result = []
  for name in FLAGS:
    flag = FLAGS[name]
    if flag.present:
      result.append(flag.serialize())
  logging.info('Flag values:\n%s', '\n'.join(result))


def SetUpPKB():
  """Set globals and environment variables for PKB.

  After SetUpPKB() returns, it should be possible to call PKB
  functions, like benchmark_spec.Prepare() or benchmark_spec.Run().

  SetUpPKB() also modifies the local file system by creating a temp
  directory and storing new SSH keys.
  """
  try:
    _InitializeRunUri()
  except errors.Error as e:
    logging.error(e)
    sys.exit(1)

  # Initialize logging.
  vm_util.GenTempDir()
  if FLAGS.use_pkb_logging:
    log_util.ConfigureLogging(
        stderr_log_level=log_util.LOG_LEVELS[FLAGS.log_level],
        log_path=vm_util.PrependTempDir(LOG_FILE_NAME),
        run_uri=FLAGS.run_uri,
        file_log_level=log_util.LOG_LEVELS[FLAGS.file_log_level])
  logging.info('PerfKitBenchmarker version: %s', version.VERSION)

  # Translate deprecated flags and log all provided flag values.
  disk.WarnAndTranslateDiskFlags()
  _LogCommandLineFlags()

  # Register skip pending runs functionality.
  RegisterSkipPendingRunsCheck(_SkipPendingRunsFile)

  # Check environment.
  if not FLAGS.ignore_package_requirements:
    requirements.CheckBasicRequirements()

  for executable in REQUIRED_EXECUTABLES:
    if not vm_util.ExecutableOnPath(executable):
      raise errors.Setup.MissingExecutableError(
          'Could not find required executable "%s"' % executable)

  # Check mutually exclusive flags
  if FLAGS.run_stage_iterations > 1 and FLAGS.run_stage_time > 0:
    raise errors.Setup.InvalidFlagConfigurationError(
        'Flags run_stage_iterations and run_stage_time are mutually exclusive')

  vm_util.SSHKeyGen()

  if FLAGS.static_vm_file:
    with open(FLAGS.static_vm_file) as fp:
      static_virtual_machine.StaticVirtualMachine.ReadStaticVirtualMachineFile(
          fp)

  events.initialization_complete.send(parsed_flags=FLAGS)

  benchmark_lookup.SetBenchmarkModuleFunction(benchmark_sets.BenchmarkModule)
  package_lookup.SetPackageModuleFunction(benchmark_sets.PackageModule)

  # Update max_concurrent_threads to use at least as many threads as VMs. This
  # is important for the cluster_boot benchmark where we want to launch the VMs
  # in parallel.
  if not FLAGS.max_concurrent_threads:
    FLAGS.max_concurrent_threads = max(
        background_tasks.MAX_CONCURRENT_THREADS,
        FLAGS.num_vms)
    logging.info('Setting --max_concurrent_threads=%d.',
                 FLAGS.max_concurrent_threads)


def RunBenchmarkTasksInSeries(tasks):
  """Runs benchmarks in series.

  Arguments:
    tasks: list of tuples of task: [(RunBenchmarkTask, (spec,), {}),]

  Returns:
    list of tuples of func results
  """
  return [func(*args, **kwargs) for func, args, kwargs in tasks]


def RunBenchmarks():
  """Runs all benchmarks in PerfKitBenchmarker.

  Returns:
    Exit status for the process.
  """
  benchmark_specs = _CreateBenchmarkSpecs()
  if FLAGS.randomize_run_order:
    random.shuffle(benchmark_specs)
  if FLAGS.dry_run:
    print('PKB will run with the following configurations:')
    for spec in benchmark_specs:
      print(spec)
      print('')
    return 0

  benchmark_spec_lists = None
  collector = SampleCollector()
  try:
    tasks = [(RunBenchmarkTask, (spec,), {})
             for spec in benchmark_specs]
    if FLAGS.run_processes is None:
      spec_sample_tuples = RunBenchmarkTasksInSeries(tasks)
    else:
      spec_sample_tuples = background_tasks.RunParallelProcesses(
          tasks, FLAGS.run_processes, FLAGS.run_processes_delay)
    benchmark_spec_lists, sample_lists = list(zip(*spec_sample_tuples))
    for sample_list in sample_lists:
      collector.samples.extend(sample_list)

  finally:
    if collector.samples:
      collector.PublishSamples()
    # Use the last run in the series of runs.
    if benchmark_spec_lists:
      benchmark_specs = [spec_list[-1] for spec_list in benchmark_spec_lists]
    if benchmark_specs:
      logging.info(benchmark_status.CreateSummary(benchmark_specs))

    logging.info('Complete logs can be found at: %s',
                 vm_util.PrependTempDir(LOG_FILE_NAME))
    logging.info('Completion statuses can be found at: %s',
                 vm_util.PrependTempDir(COMPLETION_STATUS_FILE_NAME))

  if stages.TEARDOWN not in FLAGS.run_stage:
    logging.info(
        'To run again with this setup, please use --run_uri=%s', FLAGS.run_uri)

  if FLAGS.archive_bucket:
    archive.ArchiveRun(vm_util.GetTempDir(), FLAGS.archive_bucket,
                       gsutil_path=FLAGS.gsutil_path,
                       prefix=FLAGS.run_uri + '_')

  # Write completion status file(s)
  completion_status_file_name = (
      vm_util.PrependTempDir(COMPLETION_STATUS_FILE_NAME))
  with open(completion_status_file_name, 'w') as status_file:
    _WriteCompletionStatusFile(benchmark_specs, status_file)
  if FLAGS.completion_status_file:
    with open(FLAGS.completion_status_file, 'w') as status_file:
      _WriteCompletionStatusFile(benchmark_specs, status_file)

  all_benchmarks_succeeded = all(spec.status == benchmark_status.SUCCEEDED
                                 for spec in benchmark_specs)
  return 0 if all_benchmarks_succeeded else 1


def _GenerateBenchmarkDocumentation():
  """Generates benchmark documentation to show in --help."""
  benchmark_docs = []
  for benchmark_module in (linux_benchmarks.BENCHMARKS +
                           windows_benchmarks.BENCHMARKS):
    benchmark_config = configs.LoadMinimalConfig(
        benchmark_module.BENCHMARK_CONFIG, benchmark_module.BENCHMARK_NAME)
    vm_groups = benchmark_config.get('vm_groups', {})
    total_vm_count = 0
    vm_str = ''
    scratch_disk_str = ''
    for group in six.itervalues(vm_groups):
      group_vm_count = group.get('vm_count', 1)
      if group_vm_count is None:
        vm_str = 'variable'
      else:
        total_vm_count += group_vm_count
      if group.get('disk_spec'):
        scratch_disk_str = ' with scratch volume(s)'

    name = benchmark_module.BENCHMARK_NAME
    if benchmark_module in windows_benchmarks.BENCHMARKS:
      name += ' (Windows)'
    benchmark_docs.append('%s: %s (%s VMs%s)' %
                          (name,
                           benchmark_config['description'],
                           vm_str or total_vm_count,
                           scratch_disk_str))
  return '\n\t'.join(benchmark_docs)


def _CreateLscpuSamples(vms):
  """Creates samples from linux VMs of lscpu output."""
  samples = []
  for vm in vms:
    if vm.OS_TYPE in os_types.LINUX_OS_TYPES:
      metadata = {'node_name': vm.name}
      metadata.update(vm.CheckLsCpu().data)
      samples.append(sample.Sample('lscpu', 0, '', metadata))
  return samples


def _CreateProcCpuSamples(vms):
  """Creates samples from linux VMs of lscpu output."""
  samples = []
  for vm in vms:
    if vm.OS_TYPE not in os_types.LINUX_OS_TYPES:
      continue
    data = vm.CheckProcCpu()
    metadata = {'node_name': vm.name}
    metadata.update(data.GetValues())
    samples.append(sample.Sample('proccpu', 0, '', metadata))
    metadata = {'node_name': vm.name}
    for processor_id, raw_values in data.mappings.items():
      values = ['%s=%s' % item for item in raw_values.items()]
      metadata['proc_{}'.format(processor_id)] = ';'.join(sorted(values))
    samples.append(sample.Sample('proccpu_mapping', 0, '', metadata))
  return samples


def _CreateCpuVulnerabilitySamples(vms) -> List[sample.Sample]:
  """Returns samples of the VMs' CPU vulernabilites."""

  def CreateSample(vm) -> Optional[sample.Sample]:
    metadata = {'vm_name': vm.name}
    metadata.update(vm.cpu_vulnerabilities.asdict)
    return sample.Sample('cpu_vuln', 0, '', metadata)

  linux_vms = [vm for vm in vms if vm.OS_TYPE in os_types.LINUX_OS_TYPES]
  return vm_util.RunThreaded(CreateSample, linux_vms)


def _CreateGccSamples(vms):
  """Creates samples from linux VMs of gcc version output."""

  def _GetGccMetadata(vm):
    return {
        'name': vm.name,
        'versiondump': build_tools.GetVersion(vm, 'gcc'),
        'versioninfo': build_tools.GetVersionInfo(vm, 'gcc')
    }

  return [
      sample.Sample('gcc_version', 0, '', metadata)
      for metadata in vm_util.RunThreaded(_GetGccMetadata, vms)
  ]


def _CreateGlibcSamples(vms):
  """Creates glibc samples from linux VMs of ldd output."""

  def _GetGlibcVersionInfo(vm):
    out, _ = vm.RemoteCommand('ldd --version', ignore_failure=True)
    # return first line
    return out.splitlines()[0] if out else None

  def _GetGlibcMetadata(vm):
    return {
        'name': vm.name,
        # TODO(user): Add glibc versiondump.
        'versioninfo': _GetGlibcVersionInfo(vm)
    }

  return [
      sample.Sample('glibc_version', 0, '', metadata)
      for metadata in vm_util.RunThreaded(_GetGlibcMetadata, vms)
  ]


def _ParseMeminfo(meminfo_txt: str) -> Tuple[Dict[str, int], List[str]]:
  """Returns the parsed /proc/meminfo data.

  Response has entries such as {'MemTotal' : 32887056, 'Inactive': 4576524}. If
  the /proc/meminfo entry has two values such as
    MemTotal: 32887056 kB
  checks that the last value is 'kB' If it is not then adds that line to the
  2nd value in the tuple.

  Args:
    meminfo_txt: contents of /proc/meminfo

  Returns:
    Tuple where the first entry is a dict of the parsed keys and the second
    are unparsed lines.
  """
  data: Dict[str, int] = {}
  malformed: List[str] = []
  for line in meminfo_txt.splitlines():
    try:
      key, full_value = re.split(r':\s+', line)
      parts = full_value.split()
      if len(parts) == 1 or (len(parts) == 2 and parts[1] == 'kB'):
        data[key] = int(parts[0])
      else:
        malformed.append(line)
    except ValueError:
      # If the line does not match "key: value" or if the value is not an int
      malformed.append(line)
  return data, malformed


@events.samples_created.connect
def _CollectMeminfoHandler(sender: str, benchmark_spec: bm_spec.BenchmarkSpec,
                           samples: List[sample.Sample]) -> None:
  """Optionally creates /proc/meminfo samples.

  If the flag --collect_meminfo is set appends a sample.Sample of /proc/meminfo
  data for every VM in the run.

  Parameter names cannot be changed as the method is called by events.send with
  keyword arguments.

  Args:
    sender: Unused sender.
    benchmark_spec: The benchmark spec.
    samples: Generated samples that can be appended to.
  """
  del sender  # Unused as appending to samples with VMs from benchmark_spec
  if not _COLLECT_MEMINFO.value:
    return

  def CollectMeminfo(vm):
    txt, _ = vm.RemoteCommand('cat /proc/meminfo')
    meminfo, malformed = _ParseMeminfo(txt)
    meminfo.update({
        'meminfo_keys': ','.join(sorted(meminfo)),
        'meminfo_vmname': vm.name,
        'meminfo_machine_type': vm.machine_type,
        'meminfo_os_type': vm.OS_TYPE,
    })
    if malformed:
      meminfo['meminfo_malformed'] = ','.join(sorted(malformed))
    return sample.Sample('meminfo', 0, '', meminfo)

  linux_vms = [
      vm for vm in benchmark_spec.vms if vm.OS_TYPE in os_types.LINUX_OS_TYPES
  ]

  samples.extend(vm_util.RunThreaded(CollectMeminfo, linux_vms))


def Main():
  log_util.ConfigureBasicLogging()
  _InjectBenchmarkInfoIntoDocumentation()
  _ParseFlags()
  if FLAGS.helpmatch:
    _PrintHelp(FLAGS.helpmatch)
    return 0
  if FLAGS.helpmatchmd:
    _PrintHelpMD(FLAGS.helpmatchmd)
    return 0
  CheckVersionFlag()
  SetUpPKB()
  return RunBenchmarks()
